BigQuery にデータを差分ロード(UPSERT)する方法まとめ
こんにちは、みかみです。
やりたいこと
- GCS に配置してあるファイルデータを、BigQuery に差分ロードしたい
- 重複チェックキーを指定して、キー重複レコードは後からロードするデータで上書きしたい
- ロード処理でエラーが発生した場合には、ロード前の状態にロールバックしたい(ロード前の状態を担保したい)
前提
GCS へのアクセスおよび BigQuery へのデータロードには、Python クライアントライブラリを使用します。
Python クライアントライブラリ実行環境およびクライアントライブラリで使用するサービスアカウントは準備済みです。
今回は、環境準備不要ですぐにクライアントライブラリが使える CLOUD SHELL を使用しました。
データを準備
以下のテストデータ生成ツールで確認用のデータを作成し、BigQuery にロードしました。
一見個人情報のようなデータですが、ツールでランダムに作成されたダミーデータです。
ここに、一部のレコードが重複した下記データを、差分ロードします。
id
項目を重複チェックキーとし、id=1
のデータは完全重複レコードではなく一部の項目値に差分があり、後からロードしたデータで上書きされるかどうか確認してみます。
id,name,kana,gender,mail,create_time,update_time 1,松崎 飛鳥,マツサキ アスカ,女,[email protected],1993/5/6 3:27:37,2001/4/10 20:30:22 7,柳原 日出男,ヤナギハラ ヒデオ,男,[email protected],1978/10/28 19:28:28,2020/6/9 23:36:00 8,葛西 竜一,カサイ リュウイチ,男,[email protected],1994/1/9 2:10:50,2013/5/28 9:37:02 9,嶋崎 秋夫,シマザキ アキオ,男,[email protected],2013/6/30 1:35:05,2007/3/26 2:07:30 10,藤永 和歌子,フジナガ ワカコ,女,[email protected],2001/5/8 12:39:06,1987/10/15 23:20:09 11,筒井 玲菜,ツツイ レナ,女,[email protected],2012/3/14 9:53:58,2013/5/27 17:50:28 12,羽鳥 哲雄,ハトリ テツオ,男,[email protected],2001/3/13 23:46:30,2004/8/20 1:43:35 13,梅村 栄太郎,ウメムラ エイタロウ,男,[email protected],1970/11/10 12:51:15,1980/9/17 12:41:36 14,土岐 恭之,ツチキ ヤスユキ,男,[email protected],2000/11/22 4:46:18,1970/3/30 19:16:33 15,川嶋 志穂,カワシマ シホ,女,[email protected],1992/4/10 22:46:35,1996/9/12 6:31:40
ウインドウ関数でユニークなレコードを抽出して上書き
BigQuery では SELECT
結果をテーブルに保存することが可能です。
また、クエリ実行オプションで、データ書込みモード(追記 or テーブルが空の場合のみ書込み or 全件洗い替え)を指定することができます
- クエリ結果をテーブルに保存する | BigQuery ドキュメント
- google.cloud.bigquery.job.QueryJobConfig | Python Client for Google BigQuery
- google.cloud.bigquery.job.WriteDisposition | Python Client for Google BigQuery
GCS ファイルデータを一時テーブルにロードした後、ウインドウ関数(分析関数)でオリジナルテーブルと一時テーブルデータのユニークレコードを抽出し、オリジナルテーブルのデータを洗い替えます。
重複チェックキー( id
)が同一のレコードがある場合には、update_time
項目値が最新のレコードを抽出します。
SELECT * EXCEPT(row_num) FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time) as row_num FROM ( SELECT * FROM `cm-da-mikami-yuki-258308.dataset_1.table_load_diff` UNION DISTINCT SELECT * FROM `cm-da-mikami-yuki-258308.dataset_1.table_load_diff_temp` ) ) WHERE row_num = 1
以下の Python コードを実行します。
from google.cloud import bigquery uri = "gs://test-mikami/load_data/test_data.csv" table_id = 'cm-da-mikami-yuki-258308.dataset_1.table_load_diff' table_id_temp = '{}_temp'.format(table_id) query = ( 'SELECT * EXCEPT(row_num) FROM (' 'SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time) as row_num FROM (' 'SELECT * FROM `{}` UNION DISTINCT SELECT * FROM `{}`' ')' ') WHERE row_num = 1' ).format(table_id, table_id_temp) client = bigquery.Client() try: # load data to temp table job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = 'WRITE_TRUNCATE' job_config.autodetect = True table = bigquery.Table(table_id_temp) job = client.load_table_from_uri(uri, table, job_config=job_config) print("\tStarting job {}".format(job.job_id)) job.result() print("table: {} Loaded from uri.".format(table.table_id)) # insert unique data to target table job_config = bigquery.QueryJobConfig(destination=table_id, write_disposition='WRITE_TRUNCATE') job = client.query(query, job_config=job_config) print("\tStarting job {}".format(job.job_id)) job.result() print("insert comp.") except Exception as e: print(e) finally: # drop temp table client.delete_table(table_id_temp, not_found_ok=True)
gcp_da_user@cloudshell:~/load_diff (cm-da-mikami-yuki-258308)$ python3 load_diff.py Starting job 738d8fc3-eb52-4365-bd13-7629649ca710 table: table_load_diff_temp Loaded from uri. Starting job 46ce527d-34fd-4882-88a5-8072b438ca88 insert comp.
キー重複レコードは上書きされ、差分データは追加でロードされたことが確認できました。
では、クエリ実行時にエラーが発生した場合、元データは削除されることなくクエリ実行前の状態が維持されるかどうか確認してみます。
先ほどの Python コードで、クエリエラーが発生するように SQL を変更して実行しました。
(省略) query = ( 'SELECT * EXCEPT(row_num) FROM (' 'SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time) as row_num FROM (' # 'SELECT * FROM `{}` UNION DISTINCT SELECT * FROM `{}`' 'SELECT * FROM `{}_test` UNION DISTINCT SELECT * FROM `{}`' ')' ') WHERE row_num = 1' ).format(table_id, table_id_temp) (省略)
gcp_da_user@cloudshell:~/load_diff (cm-da-mikami-yuki-258308)$ python3 load_diff.py Starting job c985d5df-82b9-4819-82dd-68c5bfffd082 table: table_load_diff_temp Loaded from uri. Starting job ab9095f8-c493-4168-aae4-e8e3ee5b8714 404 Not found: Table cm-da-mikami-yuki-258308:dataset_1.table_load_diff_test was not found in location asia-northeast1 (job ID: ab9095f8-c493-4168-aae4-e8e3ee5b8714) -----Query Job SQL Follows----- | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | 1:SELECT * EXCEPT(row_num) FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time) as row_num FROM (SELECT * FROM `cm-da-mikami-yuki-258308.dataset_1.table_load_diff_test` UNION DISTINCT SELECT * FROM `cm-da-mikami-yuki-258308.dataset_1.table_load_diff_temp`)) WHERE row_num = 1 | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . |
想定通りクエリ実行エラーです。
テーブルを確認してみると、Python 実行前のデータがそのまま格納されていることが確認できました。
MERGE
構文で ロードデータとテーブルデータをマージ
BigQuery では、MERGE
構文を使って、複数テーブルのデータをマージすることができます。
一時テーブルに GCS ファイルデータをロードした後、キー重複があった場合は一時テーブルデータで上書き、キー重複がない場合は一時テーブルデータを本テーブルに INSERT する以下の SQL で、オリジナルテーブルに差分データをマージします。
MERGE `cm-da-mikami-yuki-258308.dataset_1.table_load_diff` target USING `cm-da-mikami-yuki-258308.dataset_1.table_load_diff_temp` stg ON target.id = stg.id WHEN MATCHED THEN UPDATE SET id = stg.id, name = stg.name, kana = stg.kana, gender = stg.gender, mail = stg.mail, create_time = stg.create_time, update_time = stg.update_time WHEN NOT MATCHED THEN INSERT(id, name, kana, gender, mail, create_time, update_time) VALUES(id, name, kana, gender, mail, create_time, update_time)
以下の Python コードを実行します。
from google.cloud import bigquery uri = "gs://test-mikami/load_data/test_data.csv" table_id = 'cm-da-mikami-yuki-258308.dataset_1.table_load_diff' table_id_temp = '{}_temp'.format(table_id) query = ( 'MERGE `{}` target USING `{}` stg ' 'ON target.id = stg.id ' 'WHEN MATCHED THEN ' 'UPDATE SET id = stg.id, name = stg.name, kana = stg.kana, gender = stg.gender, mail = stg.mail, create_time = stg.create_time, update_time = stg.update_time ' 'WHEN NOT MATCHED THEN ' 'INSERT(id, name, kana, gender, mail, create_time, update_time) ' 'VALUES(id, name, kana, gender, mail, create_time, update_time)' ).format(table_id, table_id_temp) client = bigquery.Client() try: # load data to temp table job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = 'WRITE_TRUNCATE' job_config.autodetect = True table = bigquery.Table(table_id_temp) job = client.load_table_from_uri(uri, table, job_config=job_config) print("\tStarting job {}".format(job.job_id)) job.result() print("table: {} Loaded from uri.".format(table.table_id)) # merge data job = client.query(query) job.result() except Exception as e: print(e) finally: # drop temp table client.delete_table(table_id_temp, not_found_ok=True)
gcp_da_user@cloudshell:~/load_diff (cm-da-mikami-yuki-258308)$ python3 merge.py Starting job 23834019-33b9-41fb-a5a3-7663de6d1b6f table: table_load_diff_temp Loaded from uri.
テーブルデータを確認してみます。
差分ロードできていることが確認できました。
続いてエラーケースの挙動を確認するため、実行する MERGE
クエリを不正な SQL に書き換えて再度実行してみます。
(省略) query = ( 'MERGE `{}` target USING `{}` stg ' 'ON target.id = stg.id ' 'WHEN MATCHED THEN ' 'UPDATE SET id = stg.id, name = stg.name, kana = stg.kana, gender = stg.gender, mail = stg.mail, create_time = stg.create_time, update_time = stg.update_time ' 'WHEN NOT MATCHED THEN ' # 'INSERT(id, name, kana, gender, mail, create_time, update_time) ' # 'VALUES(id, name, kana, gender, mail, create_time, update_time)' 'INSERT(id, name, kana, gender, mail, create_time, update_time, temp) ' 'VALUES(id, name, kana, gender, mail, create_time, update_time, temp)' ).format(table_id, table_id_temp) (省略)
gcp_da_user@cloudshell:~/load_diff (cm-da-mikami-yuki-258308)$ python3 merge.py Starting job 80fd2482-9625-46fd-91e9-b8aed0d0dbf9 table: table_load_diff_temp Loaded from uri. 400 Column temp is not present in target table at [1:417] (job ID: 9d011493-4f00-4431-ad95-4dcc44943304) -----Query Job SQL Follows----- | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | 1:MERGE `cm-da-mikami-yuki-258308.dataset_1.table_load_diff` target USING `cm-da-mikami-yuki-258308.dataset_1.table_load_diff_temp` stg ON target.id = stg.id WHEN MATCHED THEN UPD ATE SET id = stg.id, name = stg.name, kana = stg.kana, gender = stg.gender, mail = stg.mail, create_time = stg.create_time, update_time = stg.update_time WHEN NOT MATCHED THEN INSERT (id, name, kana, gender, mail, create_time, update_time, temp) VALUES(id, name, kana, gender, mail, create_time, update_time, temp) | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . | . |
エラーケースでも、マージ前のテーブルデータが担保されることが確認できました。
テーブルデータを GCS に export 後、重複削除したファイルデータを全件洗い替えでロード
これまでのウインドウ関数や MERGE
を使った SQL の実行には、処理データ量に従って課金が発生しますが、GCS から BigQuery へのデータロードや BigQuery から GCSへのデータエクスポートには料金はかかりません。
オリジナルテーブルデータを一度 GCS にエクスポートし、サーバのプログラムで差分データをマージして GCS にファイル出力した後、出力ファイルを BigQuery にロードします。
データのマージはサーバの Python で処理するためクエリ料金がかからずに済みますが、サーバでのオンメモリ処理となるため、テーブルデータが多い場合には適さない処理です。 日々積みあがっていくトランザクションデータなどでは難しいと思いますが、社員データなど、全体量がそれほど多くなく(オンメモリ処理が可能で)追加や更新があるデータでは、コスト削減のために検討の余地がある方法かと思います。
以下の Python コードを実行します。
from google.cloud import bigquery from google.cloud import storage from io import BytesIO import pandas as pd from datetime import datetime as dt project_id = "cm-da-mikami-yuki-258308" dataset_id = "dataset_1" table_name = "table_load_diff" table_id = '{}.{}.{}'.format(project_id, dataset_id, table_name) bucket_name = 'test-mikami' uri = "gs://{}/load_data/test_data.csv".format(bucket_name) blob_export = "load_data/{}_export.csv".format(table_name) uri_dst = "gs://{}/{}".format(bucket_name, blob_export) blob_load = "load_data/{}_unique.csv".format(table_name) uri_load = "gs://{}/{}".format(bucket_name, blob_load) client = bigquery.Client() gcs = storage.Client() try: # export target table data to GCS dataset_ref = bigquery.DatasetReference(project_id, dataset_id) table_ref = dataset_ref.table(table_name) job = client.extract_table(table_ref, uri_dst) print("\tStarting job {}".format(job.job_id)) job.result() print("export: {}.".format(uri_dst)) # merge data buffer = BytesIO() with buffer as stream: gcs.download_blob_to_file(uri, stream) buffer.seek(0) df = pd.read_csv(buffer) df['create_time'] = df['create_time'].map(lambda x: dt.strptime(x, '%Y/%m/%d %H:%M:%S')) df['update_time'] = df['update_time'].map(lambda x: dt.strptime(x, '%Y/%m/%d %H:%M:%S')) buffer = BytesIO() with buffer as stream: gcs.download_blob_to_file(uri_dst, stream) buffer.seek(0) df_org = pd.read_csv(buffer) df_org['create_time'] = df_org['create_time'].map(lambda x: dt.strptime(x[0:19], '%Y-%m-%d %H:%M:%S')) df_org['update_time'] = df_org['update_time'].map(lambda x: dt.strptime(x[0:19], '%Y-%m-%d %H:%M:%S')) df = pd.concat([df, df_org], ignore_index=True).drop_duplicates(subset='id', keep='first') bucket = gcs.get_bucket(bucket_name) blob = bucket.blob(blob_load) blob.upload_from_string(df.to_csv(index=False)) print("upload: {}.".format(blob_load)) # load merge data to target table job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = 'WRITE_TRUNCATE' job_config.skip_leading_rows = 1 table = bigquery.Table(table_id) job = client.load_table_from_uri(uri_load, table, job_config=job_config) print("\tStarting job {}".format(job.job_id)) job.result() print("table: {} Loaded from uri.".format(table.table_id)) except Exception as e: print(e) finally: # delete temp files bucket = gcs.get_bucket(bucket_name) bucket.delete_blob(blob_export) bucket.delete_blob(blob_load)
gcp_da_user@cloudshell:~/load_diff (cm-da-mikami-yuki-258308)$ python3 merge_file.py Starting job df3c79f0-4d88-4b79-900a-4380de8077fa export: gs://test-mikami/load_data/table_load_diff_export.csv. upload: load_data/table_load_diff_unique.csv. Starting job ff9bceda-b201-497b-8b2f-ee26bc0424f9 table: table_load_diff Loaded from uri.
プログラム側で処理するため多少長いコードになってしまいましたが、想定通りデータが差分ロードされたことが確認できました。
念のため、エラーが発生した場合の挙動も確認してみます。 差分データとして作成したファイルにテーブルカラムがない項目を追加して、データロード時にエラーが発生するようにしました。
(省略) df = pd.concat([df, df_org], ignore_index=True).drop_duplicates(subset='id', keep='first') df['temp'] = 'test' (省略) blob.upload_from_string(df.to_csv(index=False)) print("upload: {}.".format(blob_load)) (省略) job = client.load_table_from_uri(uri_load, table, job_config=job_config) print("\tStarting job {}".format(job.job_id)) job.result() print("table: {} Loaded from uri.".format(table.table_id)) (省略)
gcp_da_user@cloudshell:~/load_diff (cm-da-mikami-yuki-258308)$ python3 merge_file.py Starting job 149ee927-b458-4c31-b996-9741953f0afd export: gs://test-mikami/load_data/table_load_diff_export.csv. upload: load_data/table_load_diff_unique.csv. Starting job c37c050f-ad12-482b-af95-70a65ef35bbc 400 Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.
期待通り、エラー発生前のデータが保持されていることが確認できました。
ストリーミングインサートで重複チェックキーを指定してロード
ストリーミングインサートを使えば、重複チェックキーを指定して重複データを削除しながら後勝ちでデータをテーブルにインサートすることができます。
GCS のファイルを読み込んでストリーミング処理で BigQuery にロードします。こちらもファイルデータを一度サーバの Python プログラムでオンメモリ処理する必要がありますが、一時テーブルなど作成する必要がなくシンプルに実装できます。
ストリーミング処理でデータロードする場合、TIMESTAMP 型のデータフォーマットは YYYY-MM-DD HH:MM[:SS[.SSSSSS]]
に変換する必要があるようです。
スラッシュ区切りの日付データをロードしようとしたところ、以下のエラーが発生しました。
Could not parse '1993/5/6 3:27:37' as a timestamp. Required format is YYYY-MM-DD HH:MM[:SS[.SSSSSS]]
重複チェックキーを指定する必要があるため、差分ロード前のテーブルデータも、ストリーミングインサートで準備しました。
以下の Python コードで、一部重複データを差分ロードします。
from google.cloud import bigquery from google.cloud import storage from io import BytesIO import pandas as pd from datetime import datetime as dt table_id = 'cm-da-mikami-yuki-258308.dataset_1.table_load_diff_stream' uri = "gs://test-mikami/load_data/test_data.csv" client = bigquery.Client() gcs = storage.Client() try: # read file data. buffer = BytesIO() with buffer as stream: gcs.download_blob_to_file(uri, stream) buffer.seek(0) df = pd.read_csv(buffer) df['create_time'] = df['create_time'].map(lambda x: dt.strptime(x, '%Y/%m/%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')) df['update_time'] = df['update_time'].map(lambda x: dt.strptime(x, '%Y/%m/%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')) # insert rows. table = client.get_table(table_id) errors = client.insert_rows_from_dataframe(table, df, row_ids=list(df['id'])) if len(errors) != [[]]: print(errors) except Exception as e: print(e)
正常に実行できたので、テーブルデータを確認します。
これまで同様、後からロードしたデータで上書きされ、差分データがロードされたことが確認できました。
エラーケースも確認してみます。 ロードデータの最終行に不正なフォーマットのデータを追加し、エラーが発生するように変更して実行してみます。
(省略) df.append({'id': 99, 'name': 'test error', 'kana': '', 'gender': '', 'mail': '', 'create_time':'2020/07/17 17:00:00', 'update_time':'2020/07/17 17:00:00'}, ignore_index=True) print(df) # insert rows. table = client.get_table(table_id) errors = client.insert_rows_from_dataframe(table, df, row_ids=list(df['id'])) (省略)
gcp_da_user@cloudshell:~/load_diff (cm-da-mikami-yuki-258308)$ python3 stream.py id name kana gender mail create_time update_time 0 1 松崎 飛鳥 マツサキ アスカ 女 [email protected] 1993-05-06 03:27:37 2001-04-10 20:30:22 1 7 柳原 日出男 ヤナギハラ ヒデオ 男 [email protected] 1978-10-28 19:28:28 2020-06-09 23:36:00 2 8 葛西 竜一 カサイ リュウイチ 男 [email protected] 1994-01-09 02:10:50 2013-05-28 09:37:02 3 9 嶋崎 秋夫 シマザキ アキオ 男 [email protected] 2013-06-30 01:35:05 2007-03-26 02:07:30 4 10 藤永 和歌子 フジナガ ワカコ 女 [email protected] 2001-05-08 12:39:06 1987-10-15 23:20:09 5 11 筒井 玲菜 ツツイ レナ 女 [email protected] 2012-03-14 09:53:58 2013-05-27 17:50:28 6 12 羽鳥 哲雄 ハトリ テツオ 男 [email protected] 2001-03-13 23:46:30 2004-08-20 01:43:35 7 13 梅村 栄太郎 ウメムラ エイタロウ 男 [email protected] 1970-11-10 12:51:15 1980-09-17 12:41:36 8 14 土岐 恭之 ツチキ ヤスユキ 男 [email protected] 2000-11-22 04:46:18 1970-03-30 19:16:33 9 15 川嶋 志穂 カワシマ シホ 女 [email protected] 1992-04-10 22:46:35 1996-09-12 06:31:40 10 99 test error 2020/07/17 17:00:00 2020/07/17 17:00:00 [[{'index': 10, 'errors': [{'reason': 'invalid', 'location': 'update_time', 'debugInfo': '', 'message': "Could not parse '2020/07/17 17:00:00' as a timestamp. Required format is YYYY -MM-DD HH:MM[:SS[.SSSSSS]]"}]}, {'index': 0, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 1, 'errors': [{'reason': 'stopped', 'locati on': '', 'debugInfo': '', 'message': ''}]}, {'index': 2, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 3, 'errors': [{'reason': 'stopp ed', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 4, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 5, 'errors': [{'rea son': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 6, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 7, 'err ors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'index': 8, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}, {'ind ex': 9, 'errors': [{'reason': 'stopped', 'location': '', 'debugInfo': '', 'message': ''}]}]]
一部の正常行のデータだけがロードされるようなこともなく、ロード実行前のデータが保証されることが確認できました。
まとめ(所感)
OLAP 用途の BigQuery としてはソースデータはレコードの重複など考慮する必要はなく、分析に使用するマートデータ作成時点で必要に応じて重複レコードを削除すればよいというスタンスなのかと思いますが、特に他のデータベースサービスからの移行時など、ソースデータを差分ロードしたいケースはあるかと思います。
複数クエリのトランザクション処理をサポートしていない BigQuery では処理によってはロールバックの考慮も必要になりますが、処理内容を検討することによりエラー発生時の複雑な考慮も必要なく、差分ロードの実装も簡単に実現できることが確認できました。